;; This file is part of scheme-GNUnet.
;; Copyright (C) 2021 Maxime Devos
;;
;; scheme-GNUnet is free software: you can redistribute it and/or modify it
;; under the terms of the GNU Affero General Public License as published
;; by the Free Software Foundation, either version 3 of the License,
;; or (at your option) any later version.
;;
;; scheme-GNUnet is distributed in the hope that it will be useful, but
;; WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
;; Affero General Public License for more details.
;;
;; You should have received a copy of the GNU Affero General Public License
;; along with this program.  If not, see <http://www.gnu.org/licenses/>.
;;
;; SPDX-License-Identifier: AGPL3.0-or-later

(use-modules (gnu gnunet mq-impl stream)
	     (gnu gnunet mq)
	     (gnu gnunet mq handler)
	     (gnu gnunet utils hat-let)
	     (gnu gnunet utils bv-slice)
	     (gnu gnunet concurrency repeated-condition)
	     (fibers conditions)
	     (fibers operations)
	     (fibers)
	     (rnrs bytevectors)
	     ((rnrs io ports) #:select (open-bytevector-input-port))
	     ((rnrs base) #:select (assert))
	     (srfi srfi-26)
	     (srfi srfi-43)
	     (rnrs io ports)
	     (ice-9 binary-ports)
	     (ice-9 suspendable-ports)
	     (ice-9 control))

(define (no-sender . _)
  (error "no sender!"))

(define no-handlers (message-handlers))

(define (no-error-handler . _)
  (error "no error handler!"))

(test-begin "mq-stream")

(define (check-slice-equal slice bv)
  (let^ ((!! (assert (= (slice-length slice)
			(bytevector-length bv))))
	 (! slice-copy (make-bytevector (slice-length slice)))
	 (! copy (bv-slice/read-write slice-copy))
	 (<-- () (slice-copy! slice copy))
	 (!! (bytevector=? slice-copy bv)))
	(values)))

;; Without interposition, and the verifier always
;; returns #t.
(define (simple-handler type handle)
  (make-message-handler
   type
   (lambda (thunk) (thunk))
   (const #t)
   handle))

(test-assert "messages + eof are injected in-order"
  (let^ ((! input/bv #vu8(0 4 0 1 ; Message type 1, size 4
			    0 5 0 2 1 ; Message type 2, size 6
			    0 6 0 3 2 1)) ; Message type 3, size 7
	 (! input (open-bytevector-input-port input/bv))
	 (! received 0)
	 (! (make-handler type expected-received expected-bv)
	    (simple-handler
	     type
	     (lambda (slice)
	       (assert (equal? received expected-received))
	       (check-slice-equal slice expected-bv)
	       (set! received (+ 1 received)))))
	 (! handler/1 (make-handler 1 0 #vu8(0 4 0 1)))
	 (! handler/2 (make-handler 2 1 #vu8(0 5 0 2 1)))
	 (! handler/3 (make-handler 3 2 #vu8(0 6 0 3 2 1)))
	 (! handlers
	    (message-handlers handler/1 handler/2 handler/3))
	 (! (error-handler . arguments)
	    (assert (equal? received 3))
	    (assert (equal? arguments '(input:regular-end-of-file)))
	    (set! received 'end-of-file))
	 (! mq (make-message-queue handlers error-handler no-sender))
	 (<-- () (handle-input! mq input)))
	;; TODO: should the port be closed?
	(assert (equal? received 'end-of-file))))

(test-assert "overly small message is detected (--> stop)"
  (let^ ((! input/bv #vu8(0 4 0 0 ; Message type 0, size 4
			    0 3 9 ; Overly small message, size 3, type != 0
			    0 4 0 1)) ; Message type 1, size 4
	 ;; The first message is well-formatted and should therefore
	 ;; be injected.  The second one isn't, so an appropriate error should
	 ;; injected.  Then the message stream is broken, so the third
	 ;; message shouldn't be injected.
	 (! input (open-bytevector-input-port input/bv))
	 (! received 0)
	 (! handler/0
	    (simple-handler 0
			    (lambda (slice)
			      (assert (equal? received 0))
			      (check-slice-equal slice #vu8(0 4 0 0))
			      (set! received 1))))
	 (! handlers
	    (message-handlers handler/0))
	 (! (error-handler . arguments)
	    (assert (equal? received 1))
	    ;; Whether this malformed even has a message type is dubious,
	    ;; but if it has one, it will be (* 256 9).
	    (assert (equal? arguments `(input:overly-small ,(* 256 9) 3)))
	    (set! received 'overly-small))
	 (! mq (make-message-queue handlers error-handler no-sender))
	 (<-- () (handle-input! mq input)))
	(assert (equal? received 'overly-small))))

(test-assert "premature eof is detected (--> stop)"
  (let^ ((! input/bv #vu8(0 8 7 6 5 4))
	 (! input (open-bytevector-input-port input/bv))
	 (! received #f)
	 (! (error-handler . arguments)
	    (assert (eq? received #f))
	    (assert (equal? arguments '(input:premature-end-of-file)))
	    (set! received #t))
	 (! mq (make-message-queue no-handlers error-handler no-sender))
	 (<-- () (handle-input! mq input)))
	(assert (equal? received #t))))

(test-equal "envelopes are written (no blocking)"
  ;; Three messages
  #vu8(0 4 0 1
	 0 4 0 2
	 0 4 0 3)
  (let^ ((! messages #(#vu8(0 4 0 1)
			   #vu8(0 4 0 2)
			   #vu8(0 4 0 3)))
	 (<-- (port get-bytevector) (open-bytevector-output-port))
	 (! mq (make-message-queue no-handlers no-error-handler
				   (lambda (_) (values))))
	 (! (insert-message index message)
	    (send-message! mq (slice/read-only (bv-slice/read-write message))))
	 (<-- ()
	      (begin
		(vector-for-each insert-message messages)
		(values)))
	 (<-- ()
	      ;; The implementation detail that 'send-round'
	      ;; is called before 'wait!' is assumed here.
	      (let/ec ec
		(handle-output! mq port ec)
		(error "unreachable"))))
	(get-bytevector)))

(define (blocking-output-port port . block-positions)
  (define (close)
    (close-port port))
  (define (write! bv index length)
    (define p (port-position port))
    (if (or (null? block-positions)
	    (< (+ p length) (car block-positions)))
	(begin (put-bytevector port bv index length) length)
	(let ((short (- (car block-positions) p)))
	  (put-bytevector port bv index short)
	  ((current-write-waiter) port/blocking)
	  (set! block-positions (cdr block-positions))
	  short)))
  (define port/blocking
    (make-custom-binary-output-port "" write! #f #f close))
  (setvbuf port/blocking 'none)
  port/blocking)

;; The ‘blocking’ is to make this test case more interesting.
;; It does not currently have any effect, but it is expected
;; that the implementation of handle-output! will be changed
;; to react to blocking, for implementing message queue
;; shutdown.

(test-equal "repeatable conditions can be used (blocking)"
  '(#vu8(0 4 0 1   0 4 0 2) . 4) ; 4: number of times writing blocks
  (let^ ((! rcvar (make-repeated-condition))
	 (! stop? (make-condition))
	 (! stopped? (make-condition))
	 (! (interrupt! mq)
	    (trigger-condition! rcvar))
	 (! escape/output (make-parameter #f))
	 (<-- (out/internal get-bytevector)
	      (open-bytevector-output-port))
	 ;; block writing a few times
	 (! out (blocking-output-port out/internal 0 1 3 7))
	 (! (wait!)
	    (perform-operation
	     (apply choice-operation
		    (prepare-await-trigger! rcvar)
		    (if (>= 8 (port-position out/internal))
			(list (wrap-operation
			       (wait-operation stop?)
			       (lambda () ((escape/output)))))
			'()))))
	 (! mq (make-message-queue no-handlers no-error-handler interrupt!))
	 (! n/blocked 0)
	 (! message/1 #vu8(0 4 0 1))
	 (! message/2 #vu8(0 4 0 2)))
	(run-fibers
	 (lambda ()
	   (spawn-fiber
	    (lambda ()
	      (let/ec ec
		(parameterize ((escape/output ec)
			       (current-write-waiter
				(lambda (port)
				  (cond ((eq? port out)
					 (set! n/blocked (+ n/blocked 1)))
					((file-port? port)
					 ;; XXX ‘Attempt to suspend fiber within
					 ;; continuaton barrier’
					 #;((@@ (fibers) wait-for-writable) port)
					 (select '() (list port) '()))))))
		  (handle-output! mq out wait!)))
	      (signal-condition! stopped?)))
	   (send-message! mq (bv-slice/read-write message/1))
	   (sleep 0.001)
	   (send-message! mq (bv-slice/read-write message/2))
	   (sleep 0.001)
	   (signal-condition! stop?)
	   (wait stopped?)
	   (cons (get-bytevector) n/blocked))
	 #:parallelism 1
	 #:hz 0)))

(test-end "mq-stream")
